---
slug: reduce-agg
title: reduce_agg lambda aggregate function
authors: [mbasmanova]
tags: [tech-blog,functions]
---

## Definition

<a href="https://facebookincubator.github.io/velox/functions/presto/aggregate.html#reduce_agg">Reduce_agg</a>
is the only lambda aggregate Presto function. It allows users to define arbitrary aggregation
logic using 2 lambda functions.

```
reduce_agg(inputValue T, initialState S, inputFunction(S, T, S), combineFunction(S, S, S)) → S

Reduces all non-NULL input values into a single value. inputFunction will be invoked for
each non-NULL input value. If all inputs are NULL, the result is NULL. In addition to taking
the input value, inputFunction takes the current state, initially initialState, and returns the
new state. combineFunction will be invoked to combine two states into a new state. The final
state is returned. Throws an error if initialState is NULL or inputFunction or combineFunction
returns a NULL.
```

Once can think of reduce_agg as using inputFunction to implement partial aggregation and
combineFunction to implement final aggregation. Partial aggregation processes a list of
input values and produces an intermediate state:

```
auto s = initialState;
for (auto x : input) {
   s = inputFunction(s, x);
}

return s;
```

Final aggregation processes a list of intermediate states and computes the final state.

```
auto s = intermediates[0];
for (auto i = 1; i < intermediates.size(); ++i)
   s = combineFunction(s, intermediates[i]);
}

return s;
```

For example, one can implement SUM aggregation using reduce_agg as follows:

```
reduce_agg(c, 0, (s, x) -> s + x, (s, s2) -> s + s2)
```

Implementation of AVG aggregation is a bit trickier. For AVG, state is a tuple of sum and
count. Hence, reduce_agg can be used to compute (sum, count) pair, but it cannot compute
the actual average. One needs to apply a scalar function on top of reduce_agg to get the
average.

```
SELECT id, sum_and_count.sum / sum_and_count.count FROM (
  SELECT id, reduce_agg(value, CAST(row(0, 0) AS row(sum double, count bigint)),
    (s, x) -> CAST(row(s.sum + x, s.count + 1) AS row(sum double, count bigint)),
    (s, s2) -> CAST(row(s.sum + s2.sum, s.count + s2.count) AS row(sum double, count bigint))) AS sum_and_count
  FROM t
  GROUP BY id
);
```

The examples of using reduce_agg to compute SUM and AVG are for illustrative purposes.
One should not use reduce_agg if a specialized aggregation function is available.

One use case for reduce_agg we see in production is to compute a product of input values.

```
reduce_agg(c, 1.0, (s, x) -> s * x, (s, s2) -> s * s2)
```

Another example is to compute a list of top N distinct values from all input arrays.

```
reduce_agg(x, array[],
            (a, b) -> slice(reverse(array_sort(array_distinct(concat(a, b)))), 1, 1000),
            (a, b) -> slice(reverse(array_sort(array_distinct(concat(a, b)))), 1, 1000))
```

Note that this is equivalent to the following query:

```
SELECT array_agg(v) FROM (
    SELECT DISTINCT v
    FROM t, UNNEST(x) AS u(v)
    ORDER BY v DESC
    LIMIT 1000
)
```

## Implementation

Efficient implementation of reduce_agg lambda function is not straightforward. Let’s
consider the logic for partial aggregation.

```
auto s = initialState;
for (auto x : input) {
   s = inputFunction(s, x);
}
```

This is a data-dependent loop, i.e. the next loop iteration depends on the results of
the previous iteration. inputFunction needs to be invoked on each input value `x`
separately. Since inputFunction is a user-defined lambda, invoking inputFunction means
evaluating an expression. And since expression evaluation in Velox is optimized for
processing large batches of values at a time, evaluating expressions on one value at
a time is very inefficient. To optimize the implementation of reduce_agg we need to
reduce the number of times we evaluate user-defined lambdas and increase the number
of values we process each time.

One approach is to

1. convert all input values into states by evaluating inputFunction(initialState, x);
1. split states into pairs and evaluate combineFunction on all pairs;
1. repeat step (2) until we have only one state left.

Let’s say we have 1024 values to process. Step 1 evaluates inputFunction expression
on 1024 values at once. Step 2 evaluates combineFunction on 512 pairs, then on 256
pairs, then on 128 pairs, 64, 32, 16, 8, 4, 2, finally producing a single state.
Step 2 evaluates combineFunction 9 times. In total, this implementation evaluates
user-defined expressions 10 times on multiple values each time. This is a lot more
efficient than the original implementation that evaluates user-defined expressions
1024 times.

In general, given N inputs, the original implementation evaluates expressions N times
while the new one log2(N) times.

Note that in case when N is not a power of two, splitting states into pairs may leave
an extra state. For example, splitting 11 states produces 5 pairs + one extra state.
In this case, we set aside the extra state, evaluate combineFunction on 5 pairs, then
bring extra state back to a total of 6 states and continue.

A benchmark, velox/functions/prestosql/aggregates/benchmarks/ReduceAgg.cpp, shows that
initial implementation of reduce_agg is 60x slower than SUM, while the optimized
 implementation is only 3x slower. A specialized aggregation function will always be
 more efficient than generic reduce_agg, hence, reduce_agg should be used only when
 specialized aggregation function is not available.

Finally, a side effect of the optimized implementation is that it doesn't support
applying reduce_agg to sorted inputs. I.e. one cannot use reduce_agg to compute an
equivalent of

```
	SELECT a, array_agg(b ORDER BY b) FROM t GROUP BY 1
```

The array_agg computation depends on order of inputs. A comparable implementation
using reduce_agg would look like this:

```
	SELECT a,
        reduce_agg(b, array[],
                    (s, x) -> concat(s, array[x]),
                    (s, s2) -> concat(s, s2)
                    ORDER BY b)
    FROM t GROUP BY 1
```

To respect ORDER BY b, the reduce_agg would have to apply inputFunction to each
input value one at a time using a data-dependent loop from above. As we saw, this
is very expensive. The optimization we apply does not preserve the order of inputs,
hence, cannot support the query above. Note that
<a href="https://github.com/facebookincubator/velox/issues/6434">Presto</a> doesn't
support applying reduce_agg to sorted inputs either.


Thank you <a href="https://www.linkedin.com/in/orrierling">Orri Erling</a> for brainstorming
and <a href="https://www.linkedin.com/in/xiaoxuanmeng">Xiaoxuan Meng</a> and
<a href="https://www.linkedin.com/in/pedro-pedreira/">Pedro Eugênio Rocha Pedreira</a> for
reviewing the code.
